package com.charm.utils;

/* 生产者 */
import java.util.Collection;

import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

/* 消费者 */
import java.util.List;
import java.util.Arrays;
import java.util.ArrayList;
import java.util.Properties;

import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;

/**
 * 部署信息：192.168.0.7 /opt/package/kafka
 * 查看主题：./kafka-topics.sh --list --zookeeper 127.0.0.1:2181
 * 删除主题：./kafka-topics.sh --delete --zookeeper 127.0.0.1:2181 --topic t_mytopic_text
 * 主题详情：./kafka-topics.sh --describe --zookeeper 127.0.0.1:2181 --topic t-analysis-text
 * 消费状态：./kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --describe --group groupid
 * 查看数据：./kafka-console-consumer.sh --zookeeper 127.0.0.1:2181  --topic t-analysis-text --from-beginning
 * 手动复位：./kafka-run-class.sh kafka.tools.UpdateOffsetsInZK earliest ../config/consumer.properties t-analysis-text
 */

/**
 * 总线生产/消费操作入口对象
 * @author gonglibin
 * 2017.08.30
 */

public class CmKafka {
	Object obj = null;
	
	public static final byte CM_PROD = 0x01;			// 生产者
	public static final byte CM_CONS = 0x02;			// 消费者
	
	public static final Long CM_OSET = -1L;				// 当前偏移
	public static final Long CM_TOUT = 1000L;			// 消费超时
	
	/**
	 * 构造函数
	 * @param m 操作角色
	 * @param g 消费组ID
	 * @param t 总线主题
	 * @param u 连接地址
	 * @return 无
	 */
	public CmKafka(byte m) {obj = CM_PROD == m ? new CmKafkaProd() : new CmKafkaCons();}
	public CmKafka(byte m, String g, String t, String u) {obj = CM_PROD == m ? new CmKafkaProd(t, u) : new CmKafkaCons(g, t, u);}
	
	/**
	 * 总线对象关闭
	 * @param 无
	 * @return 无
	 */
	public void CmKafkaClose() {
		if (obj instanceof CmKafkaProd) {
			CmKafkaProd ckp = (CmKafkaProd) obj;
			ckp.CmKafkaProdClose();
		}
		else {
			CmKafkaCons ckc = (CmKafkaCons) obj;
			ckc.CmKafkaConsClose();
		}
		
		return;
	}
	
	/**
	 * 生产者写入消息
	 * @param v 消息数据CmKafkaProd
	 * @return 无
	 */
	public void CmKafkaProducer(String v) {
		if (obj instanceof CmKafkaProd) {
			CmKafkaProd ckp = (CmKafkaProd) obj;
			ckp.CmKafkaProducer(v);
		}
		
		return;
	}
	
	/**
	 * 生产者写入消息
	 * @param k 写入键
	 * @param v 写入值
	 * @return 无
	 */
	public void CmKafkaProducer(String k, String v) {
		if (obj instanceof CmKafkaProd) {
			CmKafkaProd ckp = (CmKafkaProd) obj;
			ckp.CmKafkaProducer(k, v);
		}
		
		return;
	}
	
	/**
	 * 生产者写入消息
	 * @param l 消息列表
	 * @return 无
	 */
	public void CmKafkaProducer(List<String> l) {
		if (obj instanceof CmKafkaProd) {
			CmKafkaProd ckp = (CmKafkaProd) obj;
			ckp.CmKafkaProducer(l);
		}
		
		return;
	}
	
	/**
	 * 消费者初始偏移
	 * @param p Partition分区数组
	 * @param n 偏移值-1为当前位置
	 * @return 无
	 */
	public void CmKafkaConsSeek(int[] p, Long n) {
		((CmKafkaCons) obj).CmKafkaConsSeek(p, n);
	}
	
	/**
	 * 消费者提交偏移
	 * @param 无
	 * @return 无
	 */
	public void CmKafkaConsCommit() {
		((CmKafkaCons) obj).CmKafkaConsCommit();
	}
	
	/**
	 * 消费者读取消息
	 * @param t 超时时间
	 * @return lst 消息链表
	 */
	public List<String> CmKafkaConsumer(Long t) {
		List<String> lst = null;
		
		if (obj instanceof CmKafkaCons) {
			CmKafkaCons ckc = (CmKafkaCons) obj;
			lst = ckc.CmKafkaConsumer(t);
		}

		return lst;
	}
}

/**
 * Copyright ©2017 Charm Media Group
 * 系统总线生产者类（kafka）
 * @author gonglibin
 * 2017.08.30
 */

class CmKafkaProd {
	private String						top;			// 总线主题
	private String						url;			// 连接地址
	private Producer<String, String>	pro;			// 生产句柄
	
	private final String	CM_LTOP = "t-format-text";
	private final String	CM_LURL = "192.168.0.7:9092";

	/**
	 * 构造函数
	 * @param 无
	 * @return 无
	 */
	public CmKafkaProd() {top = CM_LTOP; url = CM_LURL; CmKafkaInit(url);}
	
	/**
	 * 构造函数
	 * @param t 总线主题
	 * @param u 连接地址
	 * @return 无
	 */
	public CmKafkaProd(String t, String u) {top = t; url = u; CmKafkaInit(url);}
	
	/**
	 * 生产者关闭
	 * @param 无
	 * @return 无
	 */
	public void CmKafkaProdClose() {
		pro.close();
		
		return;
	}
	
	/**
	 * 生产者写入消息
	 * @param v 消息数据
	 * @return 无
	 */
	public void CmKafkaProducer(String v) {
		pro.send(new ProducerRecord<String, String>(top, v));
	}
	
	/**
	 * 生产者写入消息
	 * @param k 写入键
	 * @param v 写入值
	 * @return 无
	 */
	public void CmKafkaProducer(String k, String v) {
		pro.send(new ProducerRecord<String, String>(top, k, v));
	}
	
	/**
	 * 生产者写入消息
	 * @param l 消息列表
	 * @return 无
	 */
	public void CmKafkaProducer(List<String> l) {
		for (String v : l) {
			CmKafkaProducer(v);
		}
	}
	
	/**
     * kafka初始化
     * @param u 连接地址
     * @return 无
     */
	private void CmKafkaInit(String u) {
		Properties pr = new Properties();
		
		pr.put("retries", 0);
		pr.put("acks", "all");
		pr.put("linger.ms", 1);
		pr.put("batch.size", 16384);
		pr.put("bootstrap.servers", u);
		pr.put("buffer.memory", 33554432);
		pr.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
		pr.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
		
		pro = new KafkaProducer<>(pr);
	}
}

/**
 * Copyright ©2017 Charm Media Group
 * 系统总线消费者类（kafka）
 * @author gonglibin
 * 2017.08.30
 */

class CmKafkaCons {
	private String							gid;			// 消费组ID
	private String							top;			// 总线主题
	private String							url;			// 连接地址
	private KafkaConsumer<String, String>	con;			// 消费句柄
	
	private final int		CM_LSTM = 60000;
	private final String	CM_LGRP = "incise";
	private final String	CM_LTOP = "t-analysis-text";
	private final String	CM_LURL = "192.168.0.7:9092";
	
	/**
	 * 构造函数
	 * @param 无
	 * @return 无
	 */
	public CmKafkaCons() {gid = CM_LGRP; top = CM_LTOP; url = CM_LURL; CmKafkaInit();}
	
	/**
	 * 构造函数
	 * @param g 消费组ID
	 * @param t 总线主题
	 * @param u 连接地址
	 * @return 无
	 */
	public CmKafkaCons(String g, String t, String u) {gid = g; top = t; url = u; CmKafkaInit();}
	
	/**
	 * 消费者关闭
	 * @param 无
	 * @return 无
	 */
	public void CmKafkaConsClose() {
		con.close();
	}
	
	/**
	 * 消费者读取消息
	 * @param t 超时时间
	 * @return lst 消息链表
	 */
	public List<String> CmKafkaConsumer(Long t) {
		List<String> lst = new ArrayList<String>();
		ConsumerRecords<String, String> rds = con.poll(CmKafka.CM_TOUT);
		
		for (ConsumerRecord<String, String> r : rds) {
			lst.add(r.value());
        }
		
		return lst;
	}
	
	/**
	 * 消费者初始偏移
	 * @param p Partition分区数组
	 * @param n 偏移值-1为当前位置
	 * @return 无
	 */
	public void CmKafkaConsSeek(final int[] p, final Long n) {
		if (CmKafka.CM_OSET != n) {
			con.subscribe(Arrays.asList(top), new ConsumerRebalanceListener() {
				@Override
				public void onPartitionsRevoked(Collection<TopicPartition> partitions) {}
	
				@Override
				public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
					for (int v : p) {
						con.seek(new TopicPartition(top, v), n);
					}
				}
			});
		}
		else {
			con.subscribe(Arrays.asList(top));
		}
	}
	
	/**
	 * 消费者提交偏移
	 * @param 无
	 * @return 无
	 */
	public void CmKafkaConsCommit() {
		con.commitSync();
	}
	
	 /**
     * 消费者初始化
     * @param 无
     * @return 无
     */
	private void CmKafkaInit() {
		Properties pr = new Properties();
		
		pr.put("group.id", gid);
		pr.put("bootstrap.servers", url);
		pr.put("enable.auto.commit", "false");
		pr.put("session.timeout.ms", CM_LSTM);
		pr.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
		pr.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
		con = new KafkaConsumer<>(pr);
    }
}